package com.sencorsta.ids.core.tcp.socket.server;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import com.sencorsta.ids.core.configure.SysConfig;
import com.sencorsta.ids.core.entity.ClientEvent;
import com.sencorsta.ids.core.function.FunctionSystem;
import com.sencorsta.ids.core.log.Out;
import com.sencorsta.ids.core.tcp.socket.NetHandler;
import com.sencorsta.ids.core.tcp.socket.protocol.RpcMessage;

/**
 * 　　* @description: RPC消息调度程序
 * 　　* @author TAO
 * 　　* @date 2019/6/12 17:20
 */
public abstract class RpcMessageDispatcher implements Runnable {
    public String functionName = "Dispatcher";

    private static final int __CAPACITY__ = SysConfig.getInstance().getInt("client.dispatcher.capacity", 10000);
    //private static final int __CAPACITY__ = SysConfig.getInstance().getInt("client.dispatcher.capacity", Integer.MAX_VALUE);
    private static final int __WARN_COUNT__ = 10000;
    private static final boolean __MONITOR_ENABLE__ = SysConfig.getInstance().getBoolean("client.monitor.enable", true);
    /**
     * 是否运行
     */
    private boolean __running__ = true;

    //private static ExecutorService __EXECUTOR__ = Executors.newCachedThreadPool();

    private static final SynchronousQueue __QUEUE__ = new SynchronousQueue();
    private final LinkedBlockingQueue<RpcMessage> __QUEUE__WAIT = new LinkedBlockingQueue<>(Integer.MAX_VALUE);
    private ExecutorService __EXECUTOR__ = new ThreadPoolExecutor(0, __CAPACITY__, 60L, TimeUnit.SECONDS, __QUEUE__, new IdsThreadFactory(functionName) {
    });
    //private static ExecutorService __EXECUTOR__WAIT = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2 + 1);

    /**
     * 存放Request对象的阻塞队列，包括客户端的请求消息及数据库线程的返回结果消息
     */
    //private final BlockingQueue<RpcMessage> __QUEUE__ = new LinkedBlockingQueue<RpcMessage>(__CAPACITY__);

    /**
     * 当前分发器的所有处理类
     */
    protected final Map<String, NetHandler> handlers = new HashMap<String, NetHandler>();

    public RpcMessageDispatcher() {
    }

    @Override
    public void run() {
        Out.debug(functionName, " -> ", "启动");
//        while (true) {
//            try {
//                final RpcMessage message = __QUEUE__WAIT.take();
//                __EXECUTOR__.execute(() -> {
//                    try {
//                        long sTime = 0L;
//                        // Out.trace("MessageDispatcher execute消息:" + message);
//                        if (__MONITOR_ENABLE__) {
//                            sTime = System.currentTimeMillis();
//                            String method = message.method;
//                            ScheduledFuture<?> future = FunctionSystem.addDelayJob(() -> {
//                                Out.warn("亲，调试呢(死锁?) -> " + method);
//                            //System.exit(1);
//                            }, 10000);
//                            execute(message);
//                            future.cancel(true);
//                            if (System.currentTimeMillis() - sTime > 100) {
//                                Out.warn(String.format("处理句柄【%s】耗时 -> %s", message.method, System.currentTimeMillis() - sTime));
//                            }
//                        } else {
//                            execute(message);
//                        }
//                    } catch (Exception e) {
//                        Out.error(String.format("处理句柄【%s】出错 -> %s", message.method, e.toString()), e);
//                        e.printStackTrace();
//                    }
//                });
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
//
//        }

        while (true) {
            int size=__QUEUE__WAIT.size();
            if (size>__CAPACITY__){
                size=__CAPACITY__;
            }
            try {
                for (int i = 0; i < size; i++) {
                    try {
                        final RpcMessage message = __QUEUE__WAIT.take();
                        addReceive(message);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                try {
                    Thread.sleep(size/100+10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 向消息队列尾部添加一条请求处理的消息，调用此方法的线程不会阻塞 发生队列满等其它异常情况时，请求的消息将丢弃，同时输出error日志
     *
     * @param message
     * @return
     */
    public boolean addReceive(RpcMessage message) {
        Out.trace("MessageDispatcher添加新消息:" + message);
        if (__running__) {
            try {
                __EXECUTOR__.execute(() -> {
                    try {
                        long sTime = 0L;
                        // Out.trace("MessageDispatcher execute消息:" + message);
                        if (__MONITOR_ENABLE__) {
                            sTime = System.currentTimeMillis();
                            String method = message.method;
                            ScheduledFuture<?> future = FunctionSystem.addDelayJob(() -> {
                                Out.warn("亲，调试呢(死锁?) -> " + method);
                                //System.exit(1);
                            }, 10000);
                            execute(message);
                            future.cancel(true);
                            if (System.currentTimeMillis() - sTime > 100) {
                                Out.warn(String.format(Thread.currentThread().getName() + "处理句柄【%s】耗时 -> %s", message.method, System.currentTimeMillis() - sTime));
                            }
                        } else {
                            execute(message);
                        }
                    } catch (Exception e) {
                        Out.error(String.format("处理句柄【%s】出错 -> %s", message.method, e.toString()), e);
                        e.printStackTrace();
                    }
                });
            } catch (RejectedExecutionException e) {
                Out.debug("接收队列已满,消息进入预备队列" + __QUEUE__WAIT.size());
                if (__QUEUE__WAIT.size() > __WARN_COUNT__) {
                    Out.warn("预备队列偏大 -> " + __QUEUE__WAIT.size());
                }
                __QUEUE__WAIT.offer(message);
            }
        }


//        if (__running__) {
//            if (__QUEUE__.size() > __WARN_COUNT__) {
//                Out.warn("接收队列偏大 -> " + __QUEUE__.size());
//            }
//            if (__QUEUE__.offer(message)) {
//                Out.trace("接收队列长度：" + __QUEUE__.size());
//                return true;
//            } else {
//                Out.error("接收队列已满，丢弃了请求：" + message.method);
//                return false;
//            }
//        }
        return false;
    }

    /**
     * 设置关闭
     */
    public void stop() {
        __running__ = false;
    }

    /**
     * 注册处理类
     *
     * @param handler
     * @param handler
     */
    public void registerHandler(NetHandler handler) {
        ClientEvent handle = handler.getClass().getAnnotation(ClientEvent.class);
        // Application.getInstance().addGlobalDispatcher(handle.value(), this);
        handlers.put(handle.value(), handler);
        // ((GameHandler) handler).watcher.handlerName = handle.value();
        Out.trace(functionName, "句柄处理注册成功", " -> ", handle.value());
    }


    /**
     * 处理请求包
     *
     * @param message
     */
    public abstract void execute(RpcMessage message);

}
